1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.actor; 7 8 import std.stdio : writeln, writefln; 9 10 import core.thread : Thread; 11 import logger = std.experimental.logger; 12 import std.algorithm : schwartzSort, max, min, among; 13 import std.array : empty; 14 import std.datetime : SysTime, Clock, dur; 15 import std.exception : collectException; 16 import std.functional : toDelegate; 17 import std.meta : staticMap; 18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction; 19 import std.typecons : Tuple, tuple; 20 import std.variant : Variant; 21 22 import my.actor.common : ExitReason, SystemError, makeSignature; 23 import my.actor.mailbox; 24 import my.actor.msg; 25 import my.actor.system : System; 26 import my.actor.typed : isTypedAddress, isTypedActorImpl; 27 import my.gc.refc; 28 import sumtype; 29 30 private struct PromiseData { 31 WeakAddress replyTo; 32 ulong replyId; 33 34 /// Copy constructor 35 this(ref return scope typeof(this) rhs) @safe nothrow @nogc { 36 replyTo = rhs.replyTo; 37 replyId = rhs.replyId; 38 } 39 40 @disable this(this); 41 } 42 43 // deliver can only be called one time. 44 struct Promise(T) { 45 package { 46 RefCounted!PromiseData data; 47 } 48 49 void deliver(T reply) { 50 auto tmp = reply; 51 deliver(reply); 52 } 53 54 /** Deliver the message `reply`. 55 * 56 * A promise can only be delivered once. 57 */ 58 void deliver(ref T reply) @trusted 59 in (!data.empty, "promise must be initialized") { 60 if (data.empty) 61 return; 62 scope (exit) 63 data.release; 64 65 // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed. 66 if (auto replyTo = data.get.replyTo.lock.get) { 67 enum wrapInTuple = !is(T : Tuple!U, U); 68 static if (wrapInTuple) 69 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply)))); 70 else 71 replyTo.put(Reply(data.get.replyId, Variant(reply))); 72 } 73 } 74 75 void opAssign(Promise!T rhs) { 76 data = rhs.data; 77 } 78 79 /// True if the promise is not initialized. 80 bool empty() { 81 return data.empty || data.get.replyId == 0; 82 } 83 84 /// Clear the promise. 85 void clear() { 86 data.release; 87 } 88 } 89 90 auto makePromise(T)() { 91 return Promise!T(refCounted(PromiseData.init)); 92 } 93 94 struct RequestResult(T) { 95 this(T v) { 96 value = typeof(value)(v); 97 } 98 99 this(ErrorMsg v) { 100 value = typeof(value)(v); 101 } 102 103 this(Promise!T v) { 104 value = typeof(value)(v); 105 } 106 107 SumType!(T, ErrorMsg, Promise!T) value; 108 } 109 110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe; 111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg, 112 ulong replyId, WeakAddress replyTo) @safe; 113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe; 114 115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow; 116 117 /** Actors send error messages to others by returning an error (see Errors) 118 * from a message handler. Similar to exit messages, error messages usually 119 * cause the receiving actor to terminate, unless a custom handler was 120 * installed. The default handler is used as fallback if request is used 121 * without error handler. 122 */ 123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow; 124 125 /** Bidirectional monitoring with a strong lifetime coupling is established by 126 * calling a `LinkRequest` to an address. This will cause the runtime to send 127 * an `ExitMsg` if either this or other dies. Per default, actors terminate 128 * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal. 129 * This mechanism propagates failure states in an actor system. Linked actors 130 * form a sub system in which an error causes all actors to fail collectively. 131 */ 132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow; 133 134 /// An exception has been thrown while processing a message. 135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow; 136 137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest` 138 * to an address. This will cause the runtime system to send a `DownMsg` for 139 * other if it dies. 140 * 141 * Actors drop down messages unless they provide a custom handler. 142 */ 143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow; 144 145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow { 146 } 147 148 /// Write the name of the actor and the message type to the console. 149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow { 150 import std.stdio : writeln; 151 152 try { 153 writeln("UNKNOWN message sent to actor ", self.name); 154 writeln(msg.toString); 155 } catch (Exception e) { 156 } 157 } 158 159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow { 160 self.lastError = msg.reason; 161 self.shutdown; 162 } 163 164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow { 165 self.lastError = msg.reason; 166 self.forceShutdown; 167 } 168 169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow { 170 self.lastError = SystemError.runtimeError; 171 // TODO: should log? 172 self.forceShutdown; 173 } 174 175 // Write the name of the actor and the exception to stdout. 176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow { 177 import std.stdio : writeln; 178 179 self.lastError = SystemError.runtimeError; 180 181 try { 182 writeln("EXCEPTION thrown by actor ", self.name); 183 writeln(e.msg); 184 writeln("TERMINATING"); 185 } catch (Exception e) { 186 } 187 188 self.forceShutdown; 189 } 190 191 /// Timeout for an outstanding request. 192 struct ReplyHandlerTimeout { 193 ulong id; 194 SysTime timeout; 195 } 196 197 package enum ActorState { 198 /// waiting to be started. 199 waiting, 200 /// active and processing messages. 201 active, 202 /// wait for all awaited responses to finish 203 shutdown, 204 /// discard also the awaite responses, just shutdown fast 205 forceShutdown, 206 /// in process of shutting down 207 finishShutdown, 208 /// stopped. 209 stopped, 210 } 211 212 private struct AwaitReponse { 213 Closure!(ReplyHandler, void*) behavior; 214 ErrorHandler onError; 215 } 216 217 struct Actor { 218 import std.container.rbtree : RedBlackTree, redBlackTree; 219 220 package StrongAddress addr; 221 // visible in the package for logging purpose. 222 package ActorState state_ = ActorState.stopped; 223 224 private { 225 // TODO: rename to behavior. 226 Closure!(MsgHandler, void*)[ulong] incoming; 227 Closure!(RequestHandler, void*)[ulong] reqBehavior; 228 229 // callbacks for awaited responses key:ed on their id. 230 AwaitReponse[ulong] awaitedResponses; 231 ReplyHandlerTimeout[] replyTimeouts; 232 233 // important that it start at 1 because then zero is known to not be initialized. 234 ulong nextReplyId = 1; 235 236 /// Delayed messages ordered by their trigger time. 237 RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed; 238 239 /// Used during shutdown to signal monitors and links why this actor is terminating. 240 SystemError lastError; 241 242 /// monitoring the actor lifetime. 243 WeakAddress[size_t] monitors; 244 245 /// strong, bidirectional link of the actors lifetime. 246 WeakAddress[size_t] links; 247 248 // Number of messages that has been processed. 249 ulong messages_; 250 251 /// System the actor belongs to. 252 System* homeSystem_; 253 254 string name_; 255 256 ErrorHandler errorHandler_; 257 258 /// callback when a link goes down. 259 DownHandler downHandler_; 260 261 ExitHandler exitHandler_; 262 263 ExceptionHandler exceptionHandler_; 264 265 DefaultHandler defaultHandler_; 266 } 267 268 invariant () { 269 if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) { 270 assert(errorHandler_); 271 assert(exitHandler_); 272 assert(exceptionHandler_); 273 assert(defaultHandler_); 274 } 275 } 276 277 this(StrongAddress a) @trusted 278 in (!a.empty, "address is empty") { 279 state_ = ActorState.waiting; 280 281 addr = a; 282 addr.get.setOpen; 283 delayed = new typeof(delayed); 284 285 errorHandler_ = toDelegate(&defaultErrorHandler); 286 downHandler_ = null; 287 exitHandler_ = toDelegate(&defaultExitHandler); 288 exceptionHandler_ = toDelegate(&defaultExceptionHandler); 289 defaultHandler_ = toDelegate(&.defaultHandler); 290 } 291 292 WeakAddress address() @safe { 293 return addr.weakRef; 294 } 295 296 package ref StrongAddress addressRef() return @safe pure nothrow @nogc { 297 return addr; 298 } 299 300 ref System homeSystem() @safe pure nothrow @nogc { 301 return *homeSystem_; 302 } 303 304 /** Clean shutdown of the actor 305 * 306 * Stopping incoming messages from triggering new behavior and finish all 307 * awaited respones. 308 */ 309 void shutdown() @safe nothrow { 310 if (state_.among(ActorState.waiting, ActorState.active)) 311 state_ = ActorState.shutdown; 312 } 313 314 /** Force an immediate shutdown. 315 * 316 * Stopping incoming messages from triggering new behavior and finish all 317 * awaited respones. 318 */ 319 void forceShutdown() @safe nothrow { 320 if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown)) 321 state_ = ActorState.forceShutdown; 322 } 323 324 ulong id() @safe pure nothrow const @nogc { 325 return addr.id; 326 } 327 328 /// Returns: the name of the actor. 329 string name() @safe pure nothrow const @nogc { 330 return name_; 331 } 332 333 // dfmt off 334 335 /// Set name name of the actor. 336 void name(string n) @safe pure nothrow @nogc { 337 this.name_ = n; 338 } 339 340 void errorHandler(ErrorHandler v) @safe pure nothrow @nogc { 341 errorHandler_ = v; 342 } 343 344 void downHandler(DownHandler v) @safe pure nothrow @nogc { 345 downHandler_ = v; 346 } 347 348 void exitHandler(ExitHandler v) @safe pure nothrow @nogc { 349 exitHandler_ = v; 350 } 351 352 void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc { 353 exceptionHandler_ = v; 354 } 355 356 void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc { 357 defaultHandler_ = v; 358 } 359 360 // dfmt on 361 362 package: 363 bool hasMessage() @safe pure nothrow @nogc { 364 return addr && addr.get.hasMessage; 365 } 366 367 /// How long until a delayed message or a timeout fires. 368 Duration nextTimeout(const SysTime now, const Duration default_) @safe { 369 return min(delayed.empty ? default_ : (delayed.front.triggerAt - now), 370 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now)); 371 } 372 373 bool waitingForReply() @safe pure nothrow const @nogc { 374 return !awaitedResponses.empty; 375 } 376 377 /// Number of messages that has been processed. 378 ulong messages() @safe pure nothrow const @nogc { 379 return messages_; 380 } 381 382 void setHomeSystem(System* sys) @safe pure nothrow @nogc { 383 homeSystem_ = sys; 384 } 385 386 void cleanupBehavior() @trusted nothrow { 387 foreach (ref a; incoming.byValue) { 388 try { 389 a.free; 390 } catch (Exception e) { 391 // TODO: call exceptionHandler? 392 } 393 } 394 incoming = null; 395 foreach (ref a; reqBehavior.byValue) { 396 try { 397 a.free; 398 } catch (Exception e) { 399 } 400 } 401 reqBehavior = null; 402 } 403 404 void cleanupAwait() @trusted nothrow { 405 foreach (ref a; awaitedResponses.byValue) { 406 try { 407 a.behavior.free; 408 } catch (Exception e) { 409 } 410 } 411 awaitedResponses = null; 412 } 413 414 void cleanupDelayed() @trusted nothrow { 415 foreach (const _; 0 .. delayed.length) { 416 try { 417 delayed.front.msg = Msg.init; 418 delayed.removeFront; 419 } catch (Exception e) { 420 } 421 } 422 .destroy(delayed); 423 } 424 425 bool isAlive() @safe pure nothrow const @nogc { 426 final switch (state_) { 427 case ActorState.waiting: 428 goto case; 429 case ActorState.active: 430 goto case; 431 case ActorState.shutdown: 432 goto case; 433 case ActorState.forceShutdown: 434 goto case; 435 case ActorState.finishShutdown: 436 return true; 437 case ActorState.stopped: 438 return false; 439 } 440 } 441 442 /// Accepting messages. 443 bool isAccepting() @safe pure nothrow const @nogc { 444 final switch (state_) { 445 case ActorState.waiting: 446 goto case; 447 case ActorState.active: 448 goto case; 449 case ActorState.shutdown: 450 return true; 451 case ActorState.forceShutdown: 452 goto case; 453 case ActorState.finishShutdown: 454 goto case; 455 case ActorState.stopped: 456 return false; 457 } 458 } 459 460 ulong replyId() @safe { 461 return nextReplyId++; 462 } 463 464 void process(const SysTime now) @safe nothrow { 465 import core.memory : GC; 466 467 assert(!GC.inFinalizer); 468 469 messages_ = 0; 470 471 void tick() { 472 // philosophy of the order is that a timeout should only trigger if it 473 // is really required thus it is checked last. This order then mean 474 // that a request may have triggered a timeout but because 475 // `processReply` is called before `checkReplyTimeout` it is *ignored*. 476 // Thus "better to accept even if it is timeout rather than fail". 477 try { 478 processSystemMsg(); 479 processDelayed(now); 480 processIncoming(); 481 processReply(); 482 checkReplyTimeout(now); 483 } catch (Exception e) { 484 exceptionHandler_(this, e); 485 } 486 } 487 488 assert(state_ == ActorState.stopped || addr, "no address"); 489 490 final switch (state_) { 491 case ActorState.waiting: 492 state_ = ActorState.active; 493 tick; 494 // the state can be changed before the actor have executed. 495 break; 496 case ActorState.active: 497 tick; 498 // self terminate if the actor has no behavior. 499 if (incoming.empty && awaitedResponses.empty && reqBehavior.empty) 500 state_ = ActorState.forceShutdown; 501 break; 502 case ActorState.shutdown: 503 tick; 504 if (awaitedResponses.empty) 505 state_ = ActorState.finishShutdown; 506 cleanupBehavior; 507 break; 508 case ActorState.forceShutdown: 509 state_ = ActorState.finishShutdown; 510 cleanupBehavior; 511 addr.get.setClosed; 512 break; 513 case ActorState.finishShutdown: 514 state_ = ActorState.stopped; 515 516 sendToMonitors(DownMsg(addr.weakRef, lastError)); 517 518 sendToLinks(ExitMsg(addr.weakRef, lastError)); 519 520 replyTimeouts = null; 521 cleanupDelayed; 522 cleanupAwait; 523 524 // must be last because sendToLinks and sendToMonitors uses addr. 525 addr.get.shutdown(); 526 addr.release; 527 break; 528 case ActorState.stopped: 529 break; 530 } 531 } 532 533 void sendToMonitors(DownMsg msg) @safe nothrow { 534 foreach (ref a; monitors.byValue) { 535 try { 536 if (auto rc = a.lock.get) 537 rc.put(SystemMsg(msg)); 538 a.release; 539 } catch (Exception e) { 540 } 541 } 542 543 monitors = null; 544 } 545 546 void sendToLinks(ExitMsg msg) @safe nothrow { 547 foreach (ref a; links.byValue) { 548 try { 549 if (auto rc = a.lock.get) 550 rc.put(SystemMsg(msg)); 551 a.release; 552 } catch (Exception e) { 553 } 554 } 555 556 links = null; 557 } 558 559 void checkReplyTimeout(const SysTime now) @safe { 560 if (replyTimeouts.empty) 561 return; 562 563 size_t removeTo; 564 foreach (const i; 0 .. replyTimeouts.length) { 565 if (now > replyTimeouts[i].timeout) { 566 const id = replyTimeouts[i].id; 567 if (auto v = id in awaitedResponses) { 568 messages_++; 569 v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout)); 570 try { 571 () @trusted { v.behavior.free; }(); 572 } catch (Exception e) { 573 } 574 awaitedResponses.remove(id); 575 } 576 removeTo = i + 1; 577 } else { 578 break; 579 } 580 } 581 582 if (removeTo >= replyTimeouts.length) { 583 replyTimeouts = null; 584 } else if (removeTo != 0) { 585 replyTimeouts = replyTimeouts[removeTo .. $]; 586 } 587 } 588 589 void processIncoming() @safe { 590 if (addr.get.empty!Msg) 591 return; 592 messages_++; 593 594 auto front = addr.get.pop!Msg; 595 scope (exit) 596 .destroy(front); 597 598 void doSend(ref MsgOneShot msg) { 599 if (auto v = front.get.signature in incoming) { 600 (*v)(msg.data); 601 } else { 602 defaultHandler_(this, msg.data); 603 } 604 } 605 606 void doRequest(ref MsgRequest msg) @trusted { 607 if (auto v = front.get.signature in reqBehavior) { 608 (*v)(msg.data, msg.replyId, msg.replyTo); 609 } else { 610 defaultHandler_(this, msg.data); 611 } 612 } 613 614 front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) { 615 doRequest(a); 616 }); 617 } 618 619 /** All system messages are handled. 620 * 621 * Assuming: 622 * * they are not heavy to process 623 * * very important that if there are any they should be handled as soon as possible 624 * * ignoring the case when there is a "storm" of system messages which 625 * "could" overload the actor system and lead to a crash. I classify this, 626 * for now, as intentional, malicious coding by the developer themself. 627 * External inputs that could trigger such a behavior should be controlled 628 * and limited. Other types of input such as a developer trying to break 629 * the actor system is out of scope. 630 */ 631 void processSystemMsg() @safe { 632 //() @trusted { 633 //logger.infof("run %X", cast(void*) &this); 634 //}(); 635 while (!addr.get.empty!SystemMsg) { 636 messages_++; 637 //logger.infof("%X %s %s", addr.toHash, state_, messages_); 638 auto front = addr.get.pop!SystemMsg; 639 scope (exit) 640 .destroy(front); 641 642 front.get.match!((ref DownMsg a) { 643 if (downHandler_) 644 downHandler_(this, a); 645 }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) { 646 if (auto v = a.addr.toHash in monitors) 647 v.release; 648 monitors.remove(a.addr.toHash); 649 }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) { 650 if (auto v = a.addr.toHash in links) 651 v.release; 652 links.remove(a.addr.toHash); 653 }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) { 654 exitHandler_(this, a); 655 }, (ref SystemExitMsg a) { 656 final switch (a.reason) { 657 case ExitReason.normal: 658 break; 659 case ExitReason.unhandledException: 660 exitHandler_(this, ExitMsg.init); 661 break; 662 case ExitReason.unknown: 663 exitHandler_(this, ExitMsg.init); 664 break; 665 case ExitReason.userShutdown: 666 exitHandler_(this, ExitMsg.init); 667 break; 668 case ExitReason.kill: 669 exitHandler_(this, ExitMsg.init); 670 // the user do NOT have an option here 671 forceShutdown; 672 break; 673 } 674 }); 675 } 676 } 677 678 void processReply() @safe { 679 if (addr.get.empty!Reply) 680 return; 681 messages_++; 682 683 auto front = addr.get.pop!Reply; 684 scope (exit) 685 .destroy(front); 686 687 if (auto v = front.get.id in awaitedResponses) { 688 // TODO: reduce the lookups on front.id 689 v.behavior(front.get.data); 690 try { 691 () @trusted { v.behavior.free; }(); 692 } catch (Exception e) { 693 } 694 awaitedResponses.remove(front.get.id); 695 removeReplyTimeout(front.get.id); 696 } else { 697 // TODO: should probably be SystemError.unexpectedResponse? 698 defaultHandler_(this, front.get.data); 699 } 700 } 701 702 void processDelayed(const SysTime now) @trusted { 703 if (!addr.get.empty!DelayedMsg) { 704 // count as a message because handling them are "expensive". 705 // Ignoring the case that the message right away is moved to the 706 // incoming queue. This lead to "double accounting" but ohh well. 707 // Don't use delayedSend when you should have used send. 708 messages_++; 709 delayed.insert(addr.get.pop!DelayedMsg.unsafeMove); 710 } else if (delayed.empty) { 711 return; 712 } 713 714 foreach (const i; 0 .. delayed.length) { 715 if (now > delayed.front.triggerAt) { 716 addr.get.put(delayed.front.msg); 717 delayed.removeFront; 718 } else { 719 break; 720 } 721 } 722 } 723 724 private void removeReplyTimeout(ulong id) @safe nothrow { 725 import std.algorithm : remove; 726 727 foreach (const i; 0 .. replyTimeouts.length) { 728 if (replyTimeouts[i].id == id) { 729 remove(replyTimeouts, i); 730 break; 731 } 732 } 733 } 734 735 void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted { 736 if (!isAccepting) 737 return; 738 739 if (auto v = signature in incoming) { 740 try { 741 v.free; 742 } catch (Exception e) { 743 } 744 } 745 incoming[signature] = handler; 746 } 747 748 void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted { 749 if (!isAccepting) 750 return; 751 752 if (auto v = signature in reqBehavior) { 753 try { 754 v.free; 755 } catch (Exception e) { 756 } 757 } 758 reqBehavior[signature] = handler; 759 } 760 761 void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler, 762 void*) reply, ErrorHandler onError) @safe { 763 if (!isAccepting) 764 return; 765 766 awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError); 767 replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout); 768 schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts); 769 } 770 } 771 772 struct Closure(Fn, CtxT) { 773 alias FreeFn = void function(CtxT); 774 775 Fn fn; 776 CtxT ctx; 777 FreeFn cleanup; 778 779 this(Fn fn) { 780 this.fn = fn; 781 } 782 783 this(Fn fn, CtxT* ctx, FreeFn cleanup) { 784 this.fn = fn; 785 this.ctx = ctx; 786 this.cleanup = cleanup; 787 } 788 789 void opCall(Args...)(auto ref Args args) { 790 assert(fn !is null); 791 fn(ctx, args); 792 } 793 794 void free() { 795 // will crash, on purpuse, if there is a ctx and no cleanup registered. 796 // maybe a bad idea? dunno... lets see 797 if (ctx) 798 cleanup(ctx); 799 ctx = CtxT.init; 800 } 801 } 802 803 @("shall register a behavior to be called when msg received matching signature") 804 unittest { 805 auto addr = makeAddress2; 806 auto actor = Actor(addr); 807 808 bool processedIncoming; 809 void fn(void* ctx, ref Variant msg) { 810 processedIncoming = true; 811 } 812 813 actor.register(1, Closure!(MsgHandler, void*)(&fn)); 814 addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42))))); 815 816 actor.process(Clock.currTime); 817 818 assert(processedIncoming); 819 } 820 821 private void cleanupCtx(CtxT)(void* ctx) 822 if (is(CtxT == Tuple!T, T) || is(CtxT == void)) { 823 import std.traits; 824 import my.actor.typed; 825 826 static if (!is(CtxT == void)) { 827 // trust that any use of this also pass on the correct context type. 828 auto userCtx = () @trusted { return cast(CtxT*) ctx; }(); 829 // release the context such as if it holds a rc object. 830 alias Types = CtxT.Types; 831 832 static foreach (const i; 0 .. CtxT.Types.length) { 833 { 834 alias T = CtxT.Types[i]; 835 alias UT = Unqual!T; 836 static if (!is(T == UT)) { 837 static assert(!is(UT : WeakAddress), 838 "WeakAddress must NEVER be const or immutable"); 839 static assert(!is(UT : TypedAddress!M, M...), 840 "WeakAddress must NEVER be const or immutable: " ~ T.stringof); 841 } 842 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit? 843 844 static if (is(UT == T)) { 845 .destroy((*userCtx)[i]); 846 } 847 } 848 } 849 } 850 } 851 852 @("shall default initialize when possible, skipping const/immutable") 853 unittest { 854 { 855 auto x = tuple(cast(const) 42, 43); 856 alias T = typeof(x); 857 cleanupCtx!T(cast(void*)&x); 858 assert(x[0] == 42); // can't assign to const 859 assert(x[1] == 0); 860 } 861 862 { 863 import my.path : Path; 864 865 auto x = tuple(Path.init, cast(const) Path("foo")); 866 alias T = typeof(x); 867 cleanupCtx!T(cast(void*)&x); 868 assert(x[0] == Path.init); 869 assert(x[1] == Path("foo")); 870 } 871 } 872 873 package struct Action { 874 Closure!(MsgHandler, void*) action; 875 ulong signature; 876 } 877 878 /// An behavior for an actor when it receive a message of `signature`. 879 package auto makeAction(T, CtxT = void)(T handler) @safe 880 if (isFunction!T || isFunctionPointer!T) { 881 static if (is(CtxT == void)) 882 alias Params = Parameters!T; 883 else { 884 alias CtxParam = Parameters!T[0]; 885 alias Params = Parameters!T[1 .. $]; 886 checkMatchingCtx!(CtxParam, CtxT); 887 checkRefForContext!handler; 888 } 889 890 alias HArgs = staticMap!(Unqual, Params); 891 892 void fn(void* ctx, ref Variant msg) @trusted { 893 static if (is(CtxT == void)) { 894 handler(msg.get!(Tuple!HArgs).expand); 895 } else { 896 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 897 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 898 } 899 } 900 901 return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 902 } 903 904 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe { 905 static if (is(CtxT == void)) 906 alias Params = Parameters!T; 907 else { 908 alias CtxParam = Parameters!T[0]; 909 alias Params = Parameters!T[1 .. $]; 910 checkMatchingCtx!(CtxParam, CtxT); 911 checkRefForContext!handler; 912 } 913 914 alias HArgs = staticMap!(Unqual, Params); 915 916 void fn(void* ctx, ref Variant msg) @trusted { 917 static if (is(CtxT == void)) { 918 handler(msg.get!(Tuple!HArgs).expand); 919 } else { 920 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 921 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 922 } 923 } 924 925 return typeof(return)(&fn, null, &cleanupCtx!CtxT); 926 } 927 928 package struct Request { 929 Closure!(RequestHandler, void*) request; 930 ulong signature; 931 } 932 933 private string locToString(Loc...)() { 934 import std.conv : to; 935 936 return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string; 937 } 938 939 /// Check that the context parameter is `ref` otherwise issue a warning. 940 package void checkRefForContext(alias handler)() { 941 import std.traits : ParameterStorageClass, ParameterStorageClassTuple; 942 943 alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0]; 944 945 static if (CtxParam != ParameterStorageClass.ref_) { 946 pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof); 947 static assert(CtxParam == ParameterStorageClass.ref_, 948 "The context must be `ref` to avoid unnecessary copying"); 949 } 950 } 951 952 package void checkMatchingCtx(CtxParam, CtxT)() { 953 static if (!is(CtxT == CtxParam)) { 954 static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }), 955 "mismatch between the context type " ~ CtxT.stringof 956 ~ " and the first parameter " ~ CtxParam.stringof); 957 } 958 } 959 960 package auto makeRequest(T, CtxT = void)(T handler) @safe { 961 static assert(!is(ReturnType!T == void), "handler returns void, not allowed"); 962 963 alias RType = ReturnType!T; 964 enum isReqResult = is(RType : RequestResult!ReqT, ReqT); 965 enum isPromise = is(RType : Promise!PromT, PromT); 966 967 static if (is(CtxT == void)) 968 alias Params = Parameters!T; 969 else { 970 alias CtxParam = Parameters!T[0]; 971 alias Params = Parameters!T[1 .. $]; 972 checkMatchingCtx!(CtxParam, CtxT); 973 checkRefForContext!handler; 974 } 975 976 alias HArgs = staticMap!(Unqual, Params); 977 978 void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted { 979 static if (is(CtxT == void)) { 980 auto r = handler(msg.get!(Tuple!HArgs).expand); 981 } else { 982 auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx; 983 auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand); 984 } 985 986 static if (isReqResult) { 987 r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) { 988 assert(!a.data.empty, "the promise MUST be constructed before it is returned"); 989 a.data.get.replyId = replyId; 990 a.data.get.replyTo = replyTo; 991 }, (data) { 992 enum wrapInTuple = !is(typeof(data) : Tuple!U, U); 993 if (auto rc = replyTo.lock.get) { 994 static if (wrapInTuple) 995 rc.put(Reply(replyId, Variant(tuple(data)))); 996 else 997 rc.put(Reply(replyId, Variant(data))); 998 } 999 }); 1000 } else static if (isPromise) { 1001 r.data.get.replyId = replyId; 1002 r.data.get.replyTo = replyTo; 1003 } else { 1004 // TODO: is this syntax for U one variable or variable. I want it to be variable. 1005 enum wrapInTuple = !is(RType : Tuple!U, U); 1006 if (auto rc = replyTo.lock.get) { 1007 static if (wrapInTuple) 1008 rc.put(Reply(replyId, Variant(tuple(r)))); 1009 else 1010 rc.put(Reply(replyId, Variant(r))); 1011 } 1012 } 1013 } 1014 1015 return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 1016 } 1017 1018 @("shall link two actors lifetime") 1019 unittest { 1020 int count; 1021 void countExits(ref Actor self, ExitMsg msg) @safe nothrow { 1022 count++; 1023 self.shutdown; 1024 } 1025 1026 auto aa1 = Actor(makeAddress2); 1027 auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize; 1028 auto aa2 = Actor(makeAddress2); 1029 auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize; 1030 1031 a1.linkTo(a2.address); 1032 a1.process(Clock.currTime); 1033 a2.process(Clock.currTime); 1034 1035 assert(a1.isAlive); 1036 assert(a2.isAlive); 1037 1038 sendExit(a1.address, ExitReason.userShutdown); 1039 foreach (_; 0 .. 5) { 1040 a1.process(Clock.currTime); 1041 a2.process(Clock.currTime); 1042 } 1043 1044 assert(!a1.isAlive); 1045 assert(!a2.isAlive); 1046 assert(count == 2); 1047 } 1048 1049 @("shall let one actor monitor the lifetime of the other one") 1050 unittest { 1051 int count; 1052 void downMsg(ref Actor self, DownMsg msg) @safe nothrow { 1053 count++; 1054 } 1055 1056 auto aa1 = Actor(makeAddress2); 1057 auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize; 1058 auto aa2 = Actor(makeAddress2); 1059 auto a2 = build(&aa2).set((int x) {}).finalize; 1060 1061 a1.monitor(a2.address); 1062 a1.process(Clock.currTime); 1063 a2.process(Clock.currTime); 1064 1065 assert(a1.isAlive); 1066 assert(a2.isAlive); 1067 1068 sendExit(a2.address, ExitReason.userShutdown); 1069 foreach (_; 0 .. 5) { 1070 a1.process(Clock.currTime); 1071 a2.process(Clock.currTime); 1072 } 1073 1074 assert(a1.isAlive); 1075 assert(!a2.isAlive); 1076 assert(count == 1); 1077 } 1078 1079 private struct BuildActor { 1080 Actor* actor; 1081 1082 Actor* finalize() @safe { 1083 auto rval = actor; 1084 actor = null; 1085 return rval; 1086 } 1087 1088 auto errorHandler(ErrorHandler a) { 1089 actor.errorHandler = a; 1090 return this; 1091 } 1092 1093 auto downHandler_(DownHandler a) { 1094 actor.downHandler_ = a; 1095 return this; 1096 } 1097 1098 auto exitHandler_(ExitHandler a) { 1099 actor.exitHandler_ = a; 1100 return this; 1101 } 1102 1103 auto exceptionHandler_(ExceptionHandler a) { 1104 actor.exceptionHandler_ = a; 1105 return this; 1106 } 1107 1108 auto defaultHandler_(DefaultHandler a) { 1109 actor.defaultHandler_ = a; 1110 return this; 1111 } 1112 1113 auto set(BehaviorT)(BehaviorT behavior) 1114 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1115 && !is(ReturnType!BehaviorT == void)) { 1116 auto act = makeRequest(behavior); 1117 actor.register(act.signature, act.request); 1118 return this; 1119 } 1120 1121 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1122 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1123 && !is(ReturnType!BehaviorT == void)) { 1124 auto act = makeRequest!(BehaviorT, CT)(behavior); 1125 // for now just use the GC to allocate the context on. 1126 // TODO: use an allocator. 1127 act.request.ctx = cast(void*) new CT(c); 1128 actor.register(act.signature, act.request); 1129 return this; 1130 } 1131 1132 auto set(BehaviorT)(BehaviorT behavior) 1133 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1134 && is(ReturnType!BehaviorT == void)) { 1135 auto act = makeAction(behavior); 1136 actor.register(act.signature, act.action); 1137 return this; 1138 } 1139 1140 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1141 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1142 && is(ReturnType!BehaviorT == void)) { 1143 auto act = makeAction!(BehaviorT, CT)(behavior); 1144 // for now just use the GC to allocate the context on. 1145 // TODO: use an allocator. 1146 act.action.ctx = cast(void*) new CT(c); 1147 actor.register(act.signature, act.action); 1148 return this; 1149 } 1150 } 1151 1152 package BuildActor build(Actor* a) @safe { 1153 return BuildActor(a); 1154 } 1155 1156 /// Implement an actor. 1157 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) { 1158 import my.actor.msg : isCapture, Capture; 1159 1160 auto bactor = build(self); 1161 static foreach (const i; 0 .. Behavior.length) { 1162 { 1163 alias b = Behavior[i]; 1164 1165 static if (!isCapture!b) { 1166 static if (!(isFunction!(b) || isFunctionPointer!(b))) 1167 static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof); 1168 1169 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) { 1170 bactor.set(behaviors[i], behaviors[i + 1]); 1171 } else 1172 bactor.set(behaviors[i]); 1173 } 1174 } 1175 } 1176 1177 return bactor.finalize; 1178 } 1179 1180 @("build dynamic actor from functions") 1181 unittest { 1182 static void fn3(int s) @safe { 1183 } 1184 1185 static string fn4(int s) @safe { 1186 return "foo"; 1187 } 1188 1189 static Tuple!(int, string) fn5(const string s) @safe { 1190 return typeof(return)(42, "hej"); 1191 } 1192 1193 auto aa1 = Actor(makeAddress2); 1194 auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize; 1195 } 1196 1197 unittest { 1198 bool delayOk; 1199 static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe { 1200 *c.delayOk = true; 1201 } 1202 1203 bool delayShouldNeverHappen; 1204 static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe { 1205 *c.delayShouldNeverHappen = true; 1206 } 1207 1208 auto aa1 = Actor(makeAddress2); 1209 auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2, 1210 capture(&delayShouldNeverHappen)).finalize; 1211 delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo"); 1212 delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42); 1213 1214 assert(!actor.addressRef.get.empty!DelayedMsg); 1215 assert(actor.addressRef.get.empty!Msg); 1216 assert(actor.addressRef.get.empty!Reply); 1217 1218 actor.process(Clock.currTime); 1219 1220 assert(!actor.addressRef.get.empty!DelayedMsg); 1221 assert(actor.addressRef.get.empty!Msg); 1222 assert(actor.addressRef.get.empty!Reply); 1223 1224 actor.process(Clock.currTime); 1225 actor.process(Clock.currTime); 1226 1227 assert(actor.addressRef.get.empty!DelayedMsg); 1228 assert(actor.addressRef.get.empty!Msg); 1229 assert(actor.addressRef.get.empty!Reply); 1230 1231 assert(delayOk); 1232 assert(!delayShouldNeverHappen); 1233 } 1234 1235 @("shall process a request->then chain xyz") 1236 @system unittest { 1237 // checking capture is correctly setup/teardown by using captured rc. 1238 1239 auto rcReq = refCounted(42); 1240 bool calledOk; 1241 static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s, 1242 const string b) { 1243 assert(2 == ctx[1].refCount); 1244 if (s == "apa") 1245 *ctx.calledOk = true; 1246 return "foo"; 1247 } 1248 1249 auto rcReply = refCounted(42); 1250 bool calledReply; 1251 static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) { 1252 *ctx[0] = s == "foo"; 1253 assert(2 == ctx[1].refCount); 1254 } 1255 1256 auto aa1 = Actor(makeAddress2); 1257 auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize; 1258 1259 assert(2 == rcReq.refCount); 1260 assert(1 == rcReply.refCount); 1261 1262 actor.request(actor.address, infTimeout).send("apa", "foo") 1263 .capture(&calledReply, rcReply).then(&reply); 1264 assert(2 == rcReply.refCount); 1265 1266 assert(!actor.addr.get.empty!Msg); 1267 assert(actor.addr.get.empty!Reply); 1268 1269 actor.process(Clock.currTime); 1270 assert(actor.addr.get.empty!Msg); 1271 assert(actor.addr.get.empty!Reply); 1272 1273 assert(2 == rcReq.refCount); 1274 assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back"); 1275 1276 assert(calledOk); 1277 assert(calledReply); 1278 1279 actor.shutdown; 1280 while (actor.isAlive) 1281 actor.process(Clock.currTime); 1282 } 1283 1284 @("shall process a request->then chain using promises") 1285 unittest { 1286 static struct A { 1287 string v; 1288 } 1289 1290 static struct B { 1291 string v; 1292 } 1293 1294 int calledOk; 1295 auto fn1p = makePromise!string; 1296 static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted { 1297 if (a.v == "apa") 1298 (*c.calledOk)++; 1299 return typeof(return)(c.p); 1300 } 1301 1302 auto fn2p = makePromise!string; 1303 static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) { 1304 (*c.calledOk)++; 1305 return c.p; 1306 } 1307 1308 int calledReply; 1309 static void reply(ref Tuple!(int*) ctx, const string s) { 1310 if (s == "foo") 1311 *ctx[0] += 1; 1312 } 1313 1314 auto aa1 = Actor(makeAddress2); 1315 auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2, 1316 capture(&calledOk, fn2p)).finalize; 1317 1318 actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply); 1319 actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply); 1320 1321 actor.process(Clock.currTime); 1322 assert(calledOk == 1); // first request 1323 assert(calledReply == 0); 1324 1325 fn1p.deliver("foo"); 1326 1327 assert(calledReply == 0); 1328 1329 actor.process(Clock.currTime); 1330 assert(calledOk == 2); // second request triggered 1331 assert(calledReply == 1); 1332 1333 fn2p.deliver("foo"); 1334 actor.process(Clock.currTime); 1335 1336 assert(calledReply == 2); 1337 1338 actor.shutdown; 1339 while (actor.isAlive) { 1340 actor.process(Clock.currTime); 1341 } 1342 } 1343 1344 /// The timeout triggered. 1345 class ScopedActorException : Exception { 1346 this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow { 1347 super(null, file, line); 1348 error = err; 1349 } 1350 1351 ScopedActorError error; 1352 } 1353 1354 enum ScopedActorError : ubyte { 1355 none, 1356 // actor address is down 1357 down, 1358 // request timeout 1359 timeout, 1360 // the address where unable to process the received message 1361 unknownMsg, 1362 // some type of fatal error occured. 1363 fatal, 1364 } 1365 1366 /** Intended to be used in a local scope by a user. 1367 * 1368 * `ScopedActor` is not thread safe. 1369 */ 1370 struct ScopedActor { 1371 import my.actor.typed : underlyingAddress, underlyingWeakAddress; 1372 1373 private { 1374 static struct Data { 1375 Actor self; 1376 ScopedActorError errSt; 1377 1378 ~this() @safe { 1379 if (self.addr.empty) 1380 return; 1381 1382 () @trusted { 1383 self.downHandler = null; 1384 self.defaultHandler = toDelegate(&.defaultHandler); 1385 self.errorHandler = toDelegate(&defaultErrorHandler); 1386 }(); 1387 1388 self.shutdown; 1389 while (self.isAlive) { 1390 self.process(Clock.currTime); 1391 } 1392 } 1393 } 1394 1395 RefCounted!Data data; 1396 } 1397 1398 this(StrongAddress addr, string name) @safe { 1399 data = refCounted(Data(Actor(addr))); 1400 data.get.self.name = name; 1401 } 1402 1403 private void reset() @safe nothrow { 1404 data.get.errSt = ScopedActorError.none; 1405 } 1406 1407 SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout) 1408 if (isAddress!TAddress) { 1409 reset; 1410 auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout); 1411 return SRequestSend(rs, this); 1412 } 1413 1414 private static struct SRequestSend { 1415 RequestSend rs; 1416 ScopedActor self; 1417 1418 /// Copy constructor 1419 this(ref return typeof(this) rhs) @safe pure nothrow @nogc { 1420 rs = rhs.rs; 1421 self = rhs.self; 1422 } 1423 1424 @disable this(this); 1425 1426 SRequestSendThen send(Args...)(auto ref Args args) { 1427 return SRequestSendThen(.send(rs, args), self); 1428 } 1429 } 1430 1431 private static struct SRequestSendThen { 1432 RequestSendThen rs; 1433 ScopedActor self; 1434 uint backoff; 1435 1436 /// Copy constructor 1437 this(ref return typeof(this) rhs) { 1438 rs = rhs.rs; 1439 self = rhs.self; 1440 backoff = rhs.backoff; 1441 } 1442 1443 @disable this(this); 1444 1445 void dynIntervalSleep() @trusted { 1446 // +100 usecs "feels good", magic number. current OS and 1447 // implementation of message passing isn't that much faster than 1448 // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They 1449 // aren't expected to be used for "time critical" sections. 1450 Thread.sleep(backoff.dur!"usecs"); 1451 backoff = min(backoff + 100, 20000); 1452 } 1453 1454 private static struct ValueCapture { 1455 RefCounted!Data data; 1456 1457 void downHandler(ref Actor, DownMsg) @safe nothrow { 1458 data.get.errSt = ScopedActorError.down; 1459 } 1460 1461 void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow { 1462 if (msg.reason == SystemError.requestTimeout) 1463 data.get.errSt = ScopedActorError.timeout; 1464 else 1465 data.get.errSt = ScopedActorError.fatal; 1466 } 1467 1468 void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow { 1469 logAndDropHandler(a, msg); 1470 data.get.errSt = ScopedActorError.unknownMsg; 1471 } 1472 } 1473 1474 void then(T)(T handler, ErrorHandler onError = null) { 1475 scope (exit) 1476 demonitor(rs.rs.self, rs.rs.requestTo); 1477 monitor(rs.rs.self, rs.rs.requestTo); 1478 1479 auto callback = new ValueCapture(self.data); 1480 self.data.get.self.downHandler = &callback.downHandler; 1481 self.data.get.self.defaultHandler = &callback.unknownMsgHandler; 1482 self.data.get.self.errorHandler = &callback.errorHandler; 1483 1484 () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }(); 1485 1486 scope (exit) 1487 () @trusted { 1488 self.data.get.self.downHandler = null; 1489 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler); 1490 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler); 1491 }(); 1492 1493 auto requestTo = rs.rs.requestTo.lock; 1494 if (!requestTo) 1495 throw new ScopedActorException(ScopedActorError.down); 1496 1497 // TODO: this loop is stupid... should use a conditional variable 1498 // instead but that requires changing the mailbox. later 1499 do { 1500 rs.rs.self.process(Clock.currTime); 1501 // force the actor to be alive even though there are no behaviors. 1502 rs.rs.self.state_ = ActorState.waiting; 1503 1504 if (self.data.get.errSt == ScopedActorError.none) { 1505 dynIntervalSleep; 1506 } else { 1507 throw new ScopedActorException(self.data.get.errSt); 1508 } 1509 1510 } 1511 while (self.data.get.self.waitingForReply); 1512 } 1513 } 1514 } 1515 1516 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe { 1517 import std.format : format; 1518 1519 return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line)); 1520 } 1521 1522 @( 1523 "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed") 1524 unittest { 1525 import my.actor.system; 1526 1527 auto sys = makeSystem; 1528 1529 auto a0 = sys.spawn((Actor* self) { 1530 return impl(self, (ref CSelf!() ctx, int x) { 1531 Thread.sleep(50.dur!"msecs"); 1532 return 42; 1533 }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self), 1534 (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self)); 1535 }); 1536 1537 { 1538 auto self = scopedActor; 1539 bool excThrown; 1540 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1541 while (!excThrown && Clock.currTime < stopAt) { 1542 try { 1543 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {}); 1544 } catch (ScopedActorException e) { 1545 excThrown = e.error == ScopedActorError.timeout; 1546 } catch (Exception e) { 1547 logger.info(e.msg); 1548 } 1549 } 1550 assert(excThrown, "timeout did not trigger as expected"); 1551 } 1552 1553 { 1554 auto self = scopedActor; 1555 bool excThrown; 1556 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1557 while (!excThrown && Clock.currTime < stopAt) { 1558 try { 1559 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) { 1560 }); 1561 } catch (ScopedActorException e) { 1562 excThrown = e.error == ScopedActorError.down; 1563 } catch (Exception e) { 1564 logger.info(e.msg); 1565 } 1566 } 1567 assert(excThrown, "detecting terminated actor did not trigger as expected"); 1568 } 1569 }